package com.stay4it.rxjava;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.stay4it.rxjava.bean.Course;
import com.stay4it.rxjava.bean.Entity;
import com.stay4it.rxjava.bean.Student;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/**
 * RxJava转换操作符
 */
public class RxJava04 {
	static List<Student> studentList = new ArrayList<Student>(){
		{
			add(new Student("Stay", 28));
			add(new Student("谷歌小弟", 23));
			add(new Student("Star", 25));
		}
	};
	
	static Map<String, Course> couseMap = new HashMap<>();
	
	static {
		couseMap.put("Stay", new Course("语文", 2001));
		couseMap.put("谷歌小弟", new Course("数学", 2005));
		couseMap.put("Star", new Course("美术", 2009));
	}
	
	
	/**
	 * Map
	 * 
	 * 通过使用map中的方法对Observable中发射出来的所有数据进行变换
	 * 
	 * test1()方法是得到多个Student对象中的name，保存到nameList中
	 * 注意：接口Func1包装的是有返回值的方法。
	 * 
	 * 了解更多：http://blog.csdn.net/jdsjlzx/article/details/51493772
	 */
	private static void test1(){
		List<String> nameList = new ArrayList<>();
		Observable.from(studentList)
		.map(new Func1<Student, String>() {

			@Override
			public String call(Student student) {
				return student.name;
			}
		})
		.subscribe(new Subscriber<String>() {

			@Override
			public void onCompleted() {
				System.out.println("onCompleted nameList.size() = " + nameList.size());		
			}
			
			@Override
			public void onNext(String value) {
				System.out.println("onSuccess value = " + value);
				nameList.add(value);
			}

			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	/**
	 * Map操作符连续使用
	 */
	private static void test2(){
		Observable.from(studentList)
		.map(new Func1<Student, Integer>() {
			
			@Override
			public Integer call(Student student) {
				return student.age;
			}
		})
		.map(new Func1<Integer, String>() {

			@Override
			public String call(Integer t) {
				return String.valueOf(t+10);
			}
		})
		.subscribe(new Subscriber<String>() {
			
			@Override
			public void onCompleted() {
				System.out.println("onCompleted ");
			}
			
			@Override
			public void onNext(String value) {
				System.out.println("onSuccess value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	/**
	 * Flatmap操作符
	 * FlatMap将一个发射数据的Observable变换为多个Observables，然后将它们发射的数据合并后放进一个单独的Observable
	 * 
	 * FlatMap是一个用处很大的操作符，可以将要数据根据你想要的规则进行转化后再发射出去。
	 * 其原理就是将这个Observable转化为多个以原Observable发射的数据作为源数据的Observable，
	 * 然后再将这多个Observable发射的数据整合发射出来，需要注意的是最后的顺序可能会交错地发射出来，
	 * 如果对顺序有严格的要求的话可以使用concatmap操作符。
	 * FlatMapIterable和FlatMap基本相同，不同之处为其转化的多个Observable是使用Iterable作为源数据的。
	 * 参考：http://blog.csdn.net/jdsjlzx/article/details/51493552
	 */
	private static void test3(){
		List<String> nameList = new ArrayList<>();
		Observable.from(studentList)
		.flatMap(new Func1<Student, Observable<Course>>() {

			@Override
			public Observable<Course> call(Student t) {
				Course course = couseMap.get(t.name);
				return Observable.just(course);
			}
		})
		.subscribe(new Subscriber<Course>() {
			
			@Override
			public void onCompleted() {
				System.out.println("onCompleted ");
			}
			
			@Override
			public void onNext(Course course) {
				System.out.println("onSuccess course = " + course);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	/**
	 * FlatMap操作符
	 * FlatMap将一个发射数据的Observable变换为多个Observables，然后将它们发射的数据合并后放进一个单独的Observable
	 */
	private static void test4(){
		List<String> nameList = new ArrayList<>();
		Observable.from(studentList)
		.flatMap(new Func1<Student, Observable<Entity>>() {
			
			@Override
			public Observable<Entity> call(Student student) {
				Course course = couseMap.get(student.name);
				Entity entity = new Entity(course, student);
				return Observable.just(entity);
			}
		})
		.subscribe(new Subscriber<Entity>() {
			
			@Override
			public void onCompleted() {
			}
			
			@Override
			public void onNext(Entity entity) {
				System.out.println("onSuccess entity = " + entity);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		
	}
	
	/**
	 * ConcatMap操作符
	 * 类似于最简单版本的flatMap，但是它按次序连接而不是合并那些生成的Observables，然后产生自己的数据序列。
	 * 
	 */
	private static void test5(){
		Observable.from(studentList)
		.concatMap(new Func1<Student, Observable<Course>>() {

			@Override
			public Observable<Course> call(Student t) {
				Course course = couseMap.get(t.name);
				return Observable.just(course);
			}
		})
		.subscribe(new Subscriber<Course>() {
			
			@Override
			public void onCompleted() {
			}
			
			@Override
			public void onNext(Course course) {
				System.out.println("onSuccess course = " + course);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
	}
	
	/**
	 * flatMap与ConcatMap操作符比较
	 * 区别：
	 * 无序：FlatMap对这些Observables发射的数据做的是合并(merge)操作，因此它们可能是交错的。
	 * 有序：ConcatMap不会让变换后的Observables发射的数据交错，它按照严格的顺序发射这些数据。
	 * 
	 * 说明：在同步线程中，FlatMap和ConcactMap的执行结果是一样的（结果是有序的），
	 * 	    只有在异步线程中，FlatMap结果可能是无序的，而ConcactMap始终能保持有序的结果。
	 * 
	 * concatMap与flatMap操作符的比较 参见：http://blog.csdn.net/jdsjlzx/article/details/51508852
	 */
	private static void test6(){
		List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
		Observable.from(numbers)
		.flatMap(new Func1<Integer, Observable<Integer>>() {

			@Override
			public Observable<Integer> call(Integer t) {
				return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
				//return Observable.just(t);
			}
		})
		.subscribe(new Subscriber<Integer>() {
			
			@Override
			public void onCompleted() {
			}
			
			@Override
			public void onNext(Integer value) {
				System.out.println("flatMap onSuccess value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		System.out.println("----------------------------");
		Observable.from(numbers)
		.concatMap(new Func1<Integer, Observable<Integer>>() {
			
			@Override
			public Observable<Integer> call(Integer t) {
				return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
				//return Observable.just(t);
			}
		})
		.subscribe(new Subscriber<Integer>() {
			
			@Override
			public void onCompleted() {
			}
			
			@Override
			public void onNext(Integer value) {
				System.out.println("concatMap onNext value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
	}
	
	/**
	 * switchMap
	 * 解释：将Observable发射的数据集合变换为Observables集合，然后只发射这些Observables最近发射的数据
	 * 用法与FlatMap几乎一样，区别是SwitchMap操作符只会发射[emit]最近的Observables。
	 * 
	 * 当源Observable发射一个新的数据项时，如果旧数据项订阅还未完成，就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项.
	 * 
	 * 应用场景：http://blog.csdn.net/jdsjlzx/article/details/51730162
	 * 
	 * 逻辑推演：
	 * A --> 取消空的，没有可以取消的
	 * B-->  A1被取消
	 * C-->  B1被取消
	 * D-->  C1被取消
	 * E-->  D1被取消
	 * 最终输出E1
	 */
	private static void test7(){
		Observable.just("A", "B", "C", "D", "E")
		.switchMap(new Func1<String, Observable<String>>() {  
            @Override  
            public Observable<String> call(String s) {  
                return Observable.just(s+"1").subscribeOn(Schedulers.newThread()); //并发
                //return Observable.just(s+"1");  
            }  
        })
		.subscribe(new Observer<String>() {  
            @Override  
            public void onCompleted() {  
                System.out.println("switchMap onCompleted");
            }  
  
            @Override  
            public void onError(Throwable e) {  
                System.out.println("switchMap onError :" + e);
            }  
  
            @Override  
            public void onNext(String s) {  
            	System.out.println("switchMap Next :" + s);
            }  
        });  
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	/**
	 * switchMap与flatmap的区别
	 * 
	 * 说明：在同步线程中，switchMap发射[emit]所有的Observables，
	 * 	    在异步线程中，switchMap只会发射[emit]最近的Observables。
	 * 
	 */
	private static void test8(){
		ExecutorService service = Executors.newFixedThreadPool(10);
		List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
		Observable.from(numbers)
		.flatMap(new Func1<Integer, Observable<Integer>>() {
			
			@Override
			public Observable<Integer> call(Integer t) {
				return Observable.just(t).subscribeOn(Schedulers.from(service));
				//return Observable.just(t);
			}
		})
		.subscribe(new Subscriber<Integer>() {
			
			@Override
			public void onCompleted() {
			}
			
			@Override
			public void onNext(Integer value) {
				System.out.println("flatMap onNext value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		System.out.println("----------------------------------");
		Observable.from(numbers)
		.switchMap(new Func1<Integer, Observable<Integer>>() {
			
			@Override
			public Observable<Integer> call(Integer t) {
				return Observable.just(t).subscribeOn(Schedulers.from(service));
				//return Observable.just(t);
			}
		})
		.subscribe(new Subscriber<Integer>() {
			
			@Override
			public void onCompleted() {
			}
			
			@Override
			public void onNext(Integer value) {
				System.out.println("switchMap2 onNext value = " + value);
			}
			
			@Override
			public void onError(Throwable error) {
				System.out.println("onError error = " + error);
			}
		});
		service.shutdown();
	}
	
	
	public static void main(String[] args) throws InterruptedException {
		test8();
	}
	
	
}
